#!/usr/bin/env python
# -*- coding:utf-8 -*-
# 分布式数据分析模块
# 作者：王成
# 日期：2017-04-10
import _strptime
from datetime import datetime,date
import os,sys,time,timeit,random
import logging
from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler
import multiprocessing,threading
import redis,Queue
import yaml,json,base64
import math
import requests
import contextlib
from urllib2 import urlopen
import uuid,xxhash
import traceback
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement, SimpleStatement
from cassandra import OperationTimedOut,WriteTimeout
import pickle
from daemon import Daemon
reload(sys)
sys.setdefaultencoding('utf-8')

class Version():
    publish_time = '2017-05-15 17:18:02'
    
class Dispatcher(multiprocessing.Process):
    def __init__(self,config,cpu_count):
        multiprocessing.Process.__init__(self)
        self.daemon = True
        self.config = config#配置文件
        self.thread_num = int(math.ceil(float(config['worker']['thread_num'])/cpu_count))#计算出每个进程开启的线程数
        self.threads = []#处理图像的线程数组
        self.try_times = 5#重试次数
        self.queen_size = 500#消息队列最大数量
        self.message_queen = Queue.Queue(0)#消息队列
        self.result_queen = Queue.Queue(0)#结果队列
        self.sreq = requests.Session()#利用会话减少网络连接开销
        a = requests.adapters.HTTPAdapter(pool_connections = 1000, pool_maxsize = 1000,max_retries=3)
        self.sreq.mount('http://', a)        
        self.yearid2info = {}#车辆年款<=>车辆品牌,型号,类型映射
        year_file = os.path.dirname(os.path.abspath(__file__))+"/data/yearid_to_info_dict.dat"
        if os.path.isfile(year_file):
            with open(year_file) as f:
                d_str = f.read()
                self.yearid2info = pickle.loads(d_str)
        else:
            logging.error('加载yearID映射字典时错误: 未能找到指定文件')
        
    def run(self):
        logging.info('%s pid %d', self.name,os.getpid())
        self.prepare_today = {}
        self.prepare_todayh = {}
        self.prepare_d = {}
        self.redis_cache = redis.StrictRedis(unix_socket_path=self.config['redis_cache']['unix_socket_path'])
        self.redis_quick = redis.StrictRedis(unix_socket_path=self.config['redis_quick']['unix_socket_path'])
        lua = """
        local value1 = redis.call('GET', KEYS[1])
        value1 = tonumber(value1)
        local value2 = tonumber(ARGV[1])
        if(value1)
        then
            if(value1<value2)
            then
                redis.call('SETEX', KEYS[1],3600, value2)
                return value2 - value1
            else
                return 0
            end
        else
            redis.call('SETEX', KEYS[1],3600, value2)
            return -1
        end
        """
        self.query_cache = self.redis_cache.register_script(lua)#缓存车辆拍摄时间
        
        GMT = threading.Thread(target=self.get_message)#, args=(10,)
        self.threads.append(GMT)#每个进程开启1个读数据线程
        
        for x in xrange(0, 1):
            t = threading.Thread(target=self.save_result)#, args=(10,)
            self.threads.append(t)#每个进程开启2个保存数据线程
        
        for x in xrange(0, self.thread_num):
            t = threading.Thread(target=self.processing,name = self.name+'>processing-'+str(x))#, args=(10,)
            self.threads.append(t)
            
        for t in self.threads:
            t.start()
            time.sleep(1)
            
        for t in self.threads:
            t.join()

    def get_message(self):#从redis抽取数据
        # pool = redis.ConnectionPool(host=self.config['redis']['host'], port=self.config['redis']['port'], db=0)
        # redis_mq = redis.StrictRedis(connection_pool=pool)
        redis_mq = redis.StrictRedis(unix_socket_path=self.config['redis_mq']['unix_socket_path'])
        pipe = redis_mq.pipeline()
        while 1:
            if self.queen_size > self.message_queen.qsize():
                try:
                    for x in xrange(10):
                        pipe.lpop((self.config['redis_mq']['queue_key_name']))
                    messages = pipe.execute()
                except Exception,e:
                    logging.exception('从redis抽取数据时错误: %s', str(e))
                else:
                    for msg in messages:
                        if msg:
                            self.message_queen.put({'message':msg,'try_times':0})
                        else:
                            time.sleep(0.05)
            else:
                #print 'message_queen is full'
                time.sleep(1)
                
    def save_result(self):#将数据保存到scylla等
        # def make_prepare_today(session,table_date):
            # try:
                # prepared1 = session.prepare("INSERT INTO today_"+table_date+" (s,t_h,t_m,info_id,capture_time,data_bin,region_id,location_id,location_type_id,device_id,lane_id,direction_id,speed,license_plate,lp_type,image_url1,has_image,last_captured) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);") 
            # except Exception,e:
                # logging.exception('save scylla error: %s', str(e))
                # return None
            # else:
                # return prepared1
                
        def make_prepare_d(session,table_date):
            try:
                prepared2 = session.prepare("INSERT INTO d_"+table_date+" (info_id,capture_time,data_bin,region_id,location_id,location_type_id,device_id,lane_id,direction_id,speed,license_plate,lp_type,image_url1,has_image,last_captured) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);")  
            except Exception,e:
                logging.exception('save scylla error: %s', str(e))
                return None
            else:
                return prepared2
                
        def make_prepare_todayh(session,table_date):
            try:
                prepared2 = session.prepare("INSERT INTO today_"+table_date+" (info_id,capture_time,data_bin,region_id,location_id,location_type_id,device_id,lane_id,direction_id,speed,license_plate,lp_type,image_url1,has_image,last_captured) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);")  
            except Exception,e:
                logging.exception('save scylla error: %s', str(e))
                return None
            else:
                return prepared2                
        while 1:
            try:
                cluster = Cluster(self.config['scylla']['hosts'],max_schema_agreement_wait = 30,cql_version='3.3.1',connect_timeout=60)#
                session = cluster.connect()
                session.set_keyspace("yisa_oe")
                pipe_quick = self.redis_quick.pipeline()
            except Exception,e:
                logging.exception('连接scylla时错误: %s', str(e))
                time.sleep(10)
                continue
            else:
                prepared3 = session.prepare("INSERT INTO last_captured (license_plate,capture_time) VALUES(?,?);")
                batch = BatchStatement(consistency_level=ConsistencyLevel.ONE)
                start = timeit.default_timer()
                while 1:
                    try:
                        result = self.result_queen.get(timeout=10)
                        S = random.randint(0, 31)#数据存储切片
                        tn = datetime.now()
                        T_h = tn.hour
                        T_m = tn.minute
                        capture_time_to_num = int(time.mktime(result['capture_time'].timetuple()))
                        if (tn-result['capture_time']).days >180 or (result['capture_time']-tn).days >30:#错误时间
                            logging.error('信息拍摄时间错误: %s,%s,%s' % (result['loc_id'],result['dev_id'],result['capture_time'].strftime('%Y%m%d')))
                            continue
                            #result['capture_time'] = datetime.now()
                            #capture_time_to_num = int(time.mktime(result['capture_time'].timetuple()))
                            
                        my_uuid = uuid.UUID(hex(capture_time_to_num)[2:] + xxhash.xxh64(result['image_url1']).hexdigest() + os.urandom(4).encode('hex'))
                        capture_time_to_str = result['capture_time'].strftime('%Y%m%d')
                        # today_to_str = tn.strftime('%Y%m%d')
                        todayh_to_str = tn.strftime('%Y%m%d%H')
                        try:
                            last_captured = 99999999
                            if len(result['license_plate'])>6:#排除无牌车
                                last_cache = self.query_cache(keys=[result['license_plate']], args=[capture_time_to_num])
                                if last_cache>-1:#查到了
                                    last_captured = last_cache
                                else:     
                                    try:
                                        cass_row = session.execute("SELECT capture_time FROM last_captured WHERE license_plate='%s'" % result['license_plate'])
                                        
                                        if cass_row:
                                            last_capture_time = int(cass_row[0][0])
                                            if last_capture_time < capture_time_to_num:
                                                last_captured = capture_time_to_num - last_capture_time
                                            else:
                                                last_captured = 0
                                    except Exception,e:
                                        logging.exception('查询scylla时错误: %s', str(e))
                            else:
                                last_captured = 0
                                        
                            # if today_to_str not in self.prepare_today:#查找预编译语句，不存在则创建
                                # prepared1 = make_prepare_today(session,today_to_str)
                                # if prepared1 is None:
                                    # logging.error('预编译scylla1时错误: %s',today_to_str)
                                    # time.sleep(10)
                                    # self.result_queen.put(result)#重新放入队列
                                    # continue     
                                # else:     
                                    # self.prepare_today[today_to_str] = prepared1    
                                    
                            if todayh_to_str not in self.prepare_todayh:#查找预编译语句，不存在则创建
                                prepared1 = make_prepare_todayh(session,todayh_to_str)
                                if prepared1 is None:
                                    logging.error('预编译scylla1时错误: %s',todayh_to_str)
                                    time.sleep(10)
                                    self.result_queen.put(result)#重新放入队列
                                    continue     
                                else:     
                                    self.prepare_todayh[todayh_to_str] = prepared1                                    
                                    
                            if capture_time_to_str not in self.prepare_d:#查找预编译语句，不存在则创建
                                prepared2 = make_prepare_d(session,capture_time_to_str)                              
                                if prepared2 is None:
                                    logging.error('预编译scylla2时错误: %s',capture_time_to_str)
                                    time.sleep(10)
                                    self.result_queen.put(result)#重新放入队列
                                    continue
                                else:
                                    self.prepare_d[capture_time_to_str] = prepared2
                                    
                            #start_batch = timeit.default_timer()
                            #batch.clear()
                            #batch.add(self.prepare_today[today_to_str], (S,T_h,T_m,my_uuid,capture_time_to_num*1000,result['data_bin'],result['region_id'],result['location_id'],0,result['device_id'],result['lane_id'],result['direction_id'],result['speed'],result['license_plate'],result['plate_type_id1'],result['image_url1'],result['has_image'],last_captured))
                            batch.add(self.prepare_d[capture_time_to_str], (my_uuid,capture_time_to_num*1000,result['data_bin'],result['region_id'],result['location_id'],0,result['device_id'],result['lane_id'],result['direction_id'],result['speed'],result['license_plate'],result['plate_type_id1'],result['image_url1'],result['has_image'],last_captured))

                            batch.add(self.prepare_todayh[todayh_to_str], (my_uuid,capture_time_to_num*1000,result['data_bin'],result['region_id'],result['location_id'],0,result['device_id'],result['lane_id'],result['direction_id'],result['speed'],result['license_plate'],result['plate_type_id1'],result['image_url1'],result['has_image'],last_captured))                            
                            if last_captured>0:
                                batch.add(prepared3,(result['license_plate'],capture_time_to_num))
                            if timeit.default_timer()-start>1 or batch.__len__()>=8:
                                session.execute(batch)
                                batch.clear()
                            del result['data_bin']
                            result['last_captured'] = last_captured
                            result['info_id'] = str(my_uuid)
                            result['capture_time'] = result['capture_time'].strftime('%Y-%m-%d %H:%M:%S')
                            #print 'scylla execute',(timeit.default_timer() - start_batch)
                            pipe_quick.rpush(self.config['redis_quick']['queue_key_name'], json.dumps(result))
                            if pipe_quick.__len__()==1:
                                start = timeit.default_timer()
                            if timeit.default_timer()-start>1 or pipe_quick.__len__()>100:#1秒或100条入redis一次
                                pipe_quick.execute()
                        except OperationTimedOut,e:
                            logging.error('保存到scylla时错误: %s', str(e))
                            self.result_queen.put(result)#重新放入队列
                        except WriteTimeout,e:
                            logging.error('保存到scylla时错误: %s', str(e))
                            self.result_queen.put(result)#重新放入队列
                    except Queue.Empty:
                        continue
                    except Exception,e:
                        logging.exception('保存数据时错误: %s', str(e))
                        time.sleep(5)

    def processing(self):#分析过车数据
        thread = threading.current_thread()
        logging.warning('线程ID [%s]', thread.getName())
        batch_msg = []
        while 1:
            try:
                message_info = self.message_queen.get(timeout=10)
                message = message_info['message']
                #print '1'
                try_times = message_info['try_times']+1
                try:
                    json_msg = json.loads(message)
                except Exception, e:
                    logging.exception('转json对象时错误: %s %s', str(e), message)
                else:
                    image_url = json_msg['image_url']
                    img_data = ''
                    #start = timeit.default_timer()
                    try:
                        if image_url.startswith('http'):
                            r = self.sreq.get(image_url, timeout=3)
                            img_data = r.content
                        elif image_url.startswith('ftp'):  
                            with contextlib.closing(urlopen(image_url, None, 10)) as r:
                                img_data = r.read()
                        elif image_url.startswith('/data'):
                            file_obj=open(image_url) 
                            img_data = file_obj.read()
                            file_obj.close()
                        else:
                            if len(image_url)>0:
                                logging.error('无效的图片地址: [%s]', image_url)
                    except Exception, e:                        
                        if try_times < self.try_times:
                            self.message_queen.put({'message':message,'try_times':try_times})#将消息退回队列
                            continue
                        logging.error('下载图片时错误: [%s] %s', image_url,str(e))
                    #print 'download image',(timeit.default_timer() - start)
                    json_msg['img_data'] = img_data
                    batch_msg.append(json_msg)
                    if len(batch_msg)>=8:
                        need_recognize = []
                        for msg in batch_msg:
                            result = {}
                            result['image_url1'] = msg['image_url'] 
                            result['license_plate'] = msg['license_plate']
                            result['license_plate2'] = ''
                            #datetime(*time.strptime(msg['capture_time'], '%Y-%m-%d %H:%M:%S')[:6])
                            result['capture_time'] = datetime.strptime(msg['capture_time'], '%Y-%m-%d %H:%M:%S')
                            result['region_id'] = int(msg['region_id'])
                            result['location_id'] = int(msg['location_id'])
                            result['loc_id'] = str(msg['loc_id'])
                            result['device_id'] = int(msg['device_id'])
                            result['dev_id'] = str(msg['dev_id'])
                            result['lane_id'] = int(msg['lane_id'])
                            if msg['direction_id'].isdigit():
                                result['direction_id'] = int(msg['direction_id'])
                            else:
                                result['direction_id'] = 0
                            result['speed'] = int(msg['speed'])
                            result['brand_id'] = 0
                            result['model_id'] = 0
                            result['year_id'] = 0#车辆年款
                            result['year_id_prob'] = 0#年款置信度
                            result['level_id'] = 0#车辆类型
                            result['plate_type_id1'] = int(msg['plate_type_id'])#车牌颜色
                            result['plate_type_id2'] = 0#yisa车牌颜色
                            result['detection'] = {}
                            result['color_id'] = 0#颜色
                            result['front_rear'] = 0#车头车尾
                            result['graffiti'] = 0#喷字
                            result['roof_rack'] = 0#行李架
                            result['spare_tire'] = 0#备胎
                            result['sunroof'] = 0#天窗
                            result['occupant_left'] = 0#左侧有人
                            result['occupant_right'] = 0#右侧有人
                            result['sun_visor_left'] = 0#左侧遮阳板
                            result['sun_visor_right'] = 0#右侧遮阳板
                            result['num_certificates'] = 0#年检贴个数
                            result['pendant'] = 0#挂坠
                            result['accessory'] = 0#摆件
                            result['card'] = 0#卡片
                            result['tissue'] = 0#纸巾盒
                            result['feature_b64'] = ''
                            result['data_bin'] = ''
                            result['has_image'] = 0
                            result["is_heavy_truck"]=0
                            result["heavy_truck_cover"]=0
                            result["is_tank_truck"]=0
                            result["tricycle_manned"]=0
                            img_len = len(msg['img_data'])
                            if img_len>10000 and img_len<100000000:
                                result['has_image'] = 1
                                result['img_data'] = msg['img_data'] 
                                need_recognize.append(result)
                            else:
                                self.result_queen.put(result)#放入保存数据队列
                        if len(need_recognize)>0:
                            payload = {'license_plate_numbers':[],'crop_x':[],'crop_y':[],'crop_w':[],'crop_h':[]}
                            files = []
                            for msg in need_recognize:
                                if 'license_plate' in msg:
                                    payload['license_plate_numbers'].append(msg['license_plate'])
                                else:
                                    payload['license_plate_numbers'].append('')
                                if 'crop_x' in msg:
                                    payload['crop_x'].append(msg['crop_x'])
                                    payload['crop_y'].append(msg['crop_y'])
                                    payload['crop_w'].append(msg['crop_w'])
                                    payload['crop_h'].append(msg['crop_h'])
                                else:
                                    payload['crop_x'].append(-1)
                                    payload['crop_y'].append(-1)
                                    payload['crop_w'].append(-1)
                                    payload['crop_h'].append(-1)
                                files.append(('image_files', ('image', msg['img_data'], 'image/jpg')))
                            json_data = []
                            try:
                                r = self.sreq.post(self.config['fcgi']['url_vehicle'], files=files, data=payload)
                                if r.status_code == requests.codes.ok:
                                    json_data = r.json()['data']
                                else:
                                    logging.error('识别接口异常: [%s]', image_url)
                            except Exception, e:
                                logging.error('分析图片时错误: [%s] %s', image_url,str(e))
                                if try_times < self.try_times:
                                    self.message_queen.put({'message':message,'try_times':try_times})#将消息退回队列
                            img_i = 0
                            for img_data_info in json_data:
                                result = need_recognize[img_i]
                                del result['img_data']
                                for veh_data in img_data_info:
                                    label_id = veh_data['label_id']
                                    license_plates = veh_data['license_plates']
                                    
                                    if license_plates:
                                        result['license_plate2'] = ''.join(license_plates[0]['number_chars'])
                                        result['plate_type_id2'] = license_plates[0]['type_id']
                                        #这里要注意双牌车
                                    if label_id:
                                        result['year_id'] = label_id[0]
                                        result['year_id_prob'] = veh_data['label_id_prob'][0]
                                    result['data_bin'] = base64.b64decode(veh_data['data_bin'])
                                    #print veh_data['feature_b64']
                                    result['feature_b64'] = veh_data['feature_b64']
                                    if result['year_id'] in self.yearid2info:
                                        year_info = self.yearid2info[result['year_id']]
                                        result['brand_id'] = year_info['brandID']
                                        result['model_id'] = year_info['modelID']
                                        result['level_id'] = year_info['levelID']
                                    result['detection'] = veh_data['detection']
                                    result['color_id'] = veh_data['attrs']['color_id']#颜色
                                    result['front_rear'] = veh_data['attrs']['front_rear']#车头车尾
                                    result['graffiti'] = veh_data['attrs']['graffiti']#喷字
                                    result['roof_rack'] = veh_data['attrs']['roof_rack']#行李架
                                    result['spare_tire'] = veh_data['attrs']['spare_tire']#备胎
                                    result['sunroof'] = veh_data['attrs']['sunroof']#天窗
                                    result['is_heavy_truck'] = veh_data['attrs']['is_heavy_truck']#渣土车
                                    result['heavy_truck_cover'] = veh_data['attrs']['heavy_truck_cover']#渣土车盖盖子   0:未盖盖  1:盖盖子了
                                    result['is_tank_truck'] = veh_data['attrs']['is_tank_truck']#危化品车
                                    result['tricycle_manned'] = veh_data['attrs']['tricycle_manned']#三轮车载人
                                    result['veh_type_id'] = veh_data['attrs']['veh_type_id'][0]#三轮车
                                    if 'fw_attrs' in veh_data:
                                        result['occupant_left'] = veh_data['fw_attrs']['occupant_left']#左侧有人
                                        result['occupant_right'] = veh_data['fw_attrs']['occupant_right']#右侧有人
                                        result['sun_visor_left'] = veh_data['fw_attrs']['sun_visor_left']#左侧遮阳板
                                        result['sun_visor_right'] = veh_data['fw_attrs']['sun_visor_right']#右侧遮阳板
                                        result['num_certificates'] = veh_data['fw_attrs']['num_certificates']#年检贴个数
                                        result['pendant'] = veh_data['fw_attrs']['pendant']#挂坠
                                        result['accessory'] = veh_data['fw_attrs']['accessory']#摆件
                                        result['card'] = veh_data['fw_attrs']['card']#卡片
                                        result['tissue'] = veh_data['fw_attrs']['tissue']#纸巾盒
                                    while self.queen_size < self.result_queen.qsize():
                                        time.sleep(10)
                                        logging.warning('结果队列已满: [%s]', thread.getName())
                                    self.result_queen.put(result)#放入保存数据队列
                                    break#一个图一个结果
                                img_i+=1
                            need_recognize = []
                        batch_msg = []
                    else:
                        pass
            except Queue.Empty:
                #print 'message_queen is empty'
                continue                                
            except Exception,e:
                logging.exception('分析过车数据时错误: %s', str(e))

class MyDaemon(Daemon):
    def run(self):
        config_file = open(os.path.dirname(os.path.abspath(__file__)) + '/config.yaml')
        config = yaml.safe_load(config_file)
        config_file.close()
        name = 'yisa_worker'
        logging.basicConfig(level=logging.INFO)
        handler = RotatingFileHandler('/var/log/%s.log' % name, maxBytes=134217728, backupCount=7)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logging.getLogger().addHandler(handler)
        #-------------------同步输出到控制台-------------------
        # console = logging.StreamHandler()
        # console.setLevel(logging.INFO)
        # formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        # console.setFormatter(formatter)
        # logging.getLogger().addHandler(console)
        #-------------------------------------------------------
        requests_log = logging.getLogger("requests.packages.urllib3")
        requests_log.setLevel(logging.ERROR)            
        logging.warning('启动 [%s]', name)  
        logging.warning('主线程 Pid [%d]', os.getpid())

        process = []
        cpu_count = multiprocessing.cpu_count()
        cpu_count = min(config['worker']['max_process'],cpu_count)
        
        logging.warning('CPU线程数 [%d]', cpu_count)
        for x in xrange(0, cpu_count):
            process.append(Dispatcher(config,cpu_count))
        for p in process:    
            p.start()  
        try:    
            for p in process:    
                p.join()          
        except KeyboardInterrupt:
            for p in process:    
                p.terminate()
            logging.warning('Ctrl+C,终止运行')
        #=================本机测试代码用======================

if __name__ == "__main__":
    daemon = MyDaemon('/var/run/yisa_worker.pid')
    #daemon.run()
    #sys.exit(1)
    if len(sys.argv) == 2:
        if 'start' == sys.argv[1]:
            daemon.start()
        elif 'stop' == sys.argv[1]:
            daemon.stop()
        elif 'restart' == sys.argv[1]:
            daemon.restart()
        else:
            print "Unknown command"
            sys.exit(2)
        sys.exit(0)
    else:
        print "usage: %s start|stop|restart" % sys.argv[0]
        sys.exit(2)

